package com.atguigu.flink.chapter11;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Author lizhenchao@atguigu.cn
 * @Date 2021/12/21 10:04
 */
public class Flink07_SQL_kafka {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 20000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        
        // 建立一个表, 与文件关联
        tEnv.executeSql("create table sensor(" +
                            "   id string, " +
                            "   ts bigint, " +
                            "   vc int" +
                            ")with(" +
                            "  'connector' = 'kafka',\n" +
                            "  'topic' = 's1',\n" +
                            "  'properties.bootstrap.servers' = 'hadoop162:9092',\n" +
                            "  'properties.group.id' = 'Flink07_SQL_kafka',\n" +
                            "  'scan.startup.mode' = 'latest-offset',\n" +
                            "  'format' = 'json'" +
                            ")");
        
    
        tEnv.executeSql("create table s_out(" +
                            "   id string, " +
                            "   ts bigint, " +
                            "   vc int" +
                            ")with(" +
                            "  'connector' = 'kafka',\n" +
                            "  'topic' = 's2',\n" +
                            "  'properties.bootstrap.servers' = 'hadoop162:9092',\n" +
                            "  'format' = 'json',\n" +
                            "  'sink.partitioner' = 'round-robin'\n" +
                            ")");
        
        
        tEnv.executeSql("insert into s_out select * from sensor");
        
    }
}
